package com.three.netty.core.router;

import com.google.common.eventbus.Subscribe;
import com.three.api.connection.Connection;
import com.three.api.connection.SessionContext;
import com.three.api.event.RouterChangeEvent;
import com.three.api.router.ClientLocation;
import com.three.api.router.Router;
import com.three.api.spi.common.MQClient;
import com.three.api.spi.common.MQClientFactory;
import com.three.api.spi.common.MQMessageReceiver;
import com.three.common.message.base.KickPlayerMessage;
import com.three.common.message.base.gateway.GatewayKickPlayerMessage;
import com.three.common.router.KickRemoteMsg;
import com.three.common.router.RemoteRouter;
import com.three.config.common.IConfig;
import com.three.event.EventConsumer;
import com.three.netty.core.server.GatewayUDPConnector;
import com.three.tools.Utils;
import com.three.utils.JsonUtils;
import com.three.utils.LogUtils;

import java.net.InetSocketAddress;

import static com.three.common.ServerNodes.GS;

/**
 * Created by mathua on 2017/5/25.
 */
public final class RouterChangeListener extends EventConsumer implements MQMessageReceiver {
    public static final String KICK_CHANNEL_ = "/chess/kick/";
    private final String kick_channel = KICK_CHANNEL_ + GS.hostAndPort();
    private final boolean udpGateway = IConfig.chess.net.udpGateway();
    private MQClient mqClient;

    public RouterChangeListener() {
        if (!udpGateway) {
            mqClient = MQClientFactory.create();
            mqClient.subscribe(getKickChannel(), this);
        }
    }

    public String getKickChannel() {
        return kick_channel;
    }

    public String getKickChannel(String hostAndPort) {
        return KICK_CHANNEL_ + hostAndPort;
    }

    @Subscribe
    void on(RouterChangeEvent event) {
        long playerId = event.playerId;
        Router<?> r = event.router;
        if (r.getRouteType().equals(Router.RouterType.LOCAL)) {
            kickLocal(playerId, (LocalRouter) r);
        } else {
            kickRemote(playerId, (RemoteRouter) r);
        }
    }

    /**
     * 发送踢人消息到客户端
     *
     * @param playerId
     * @param router
     */
    private void kickLocal(final long playerId, final LocalRouter router) {
        Connection connection = router.getRouteValue();
        SessionContext context = connection.getSessionContext();
        KickPlayerMessage message = KickPlayerMessage.build(connection);
        message.setDeviceId(context.getDeviceId());
        message.setPlayerId(playerId);
        message.send(future -> {
            if (future.isSuccess()) {
                LogUtils.CONN.info("kick local connection success, playerId={}, router={}, conn={}", playerId, router, connection);
            } else {
                LogUtils.CONN.warn("kick local connection failure, playerId={}, router={}, conn={}", playerId, router, connection);
            }
        });
    }

    /**
     * 广播踢人消息到消息中心（redis）.
     * <p>
     * 有可能目标机器是当前机器，所以要做一次过滤
     * 如果client连续2次链接到同一台机器上就有会出现这中情况
     *
     * @param playerId
     * @param remoteRouter
     */
    private void kickRemote(long playerId, RemoteRouter remoteRouter) {
        ClientLocation location = remoteRouter.getRouteValue();
        //1.如果目标机器是当前机器，就不要再发送广播了，直接忽略
        if (location.isThisPC(GS.getHost(), GS.getPort())) {
            LogUtils.CONN.debug("kick remote router in local pc, ignore remote broadcast, playerId={}", playerId);
            return;
        }

        if (udpGateway) {
            Connection connection = GatewayUDPConnector.I().getConnection();
            GatewayKickPlayerMessage.build(connection)
                    .setPlayerId(playerId)
                    .setClientType(location.getClientType())
                    .setConnId(location.getConnId())
                    .setDeviceId(location.getDeviceId())
                    .setTargetServer(location.getHost())
                    .setTargetPort(location.getPort())
                    .setRecipient(new InetSocketAddress(location.getHost(), location.getPort()))
                    .sendRaw();
        } else {
            //2.发送广播
            //TODO 远程机器可能不存在，需要确认下redis 那个通道如果机器不存在的话，是否会存在消息积压的问题。
            RedisKickRemoteMessage message = new RedisKickRemoteMessage()
                    .setPlayerId(playerId)
                    .setClientType(location.getClientType())
                    .setConnId(location.getConnId())
                    .setDeviceId(location.getDeviceId())
                    .setTargetServer(location.getHost())
                    .setTargetPort(location.getPort());
            mqClient.publish(getKickChannel(location.getHostAndPort()), message);
        }
    }

    /**
     * 处理远程机器发送的踢人广播.
     * <p>
     * 一台机器发送广播所有的机器都能收到，
     * 包括发送广播的机器，所有要做一次过滤
     *
     * @param msg
     */
    public void onReceiveKickRemoteMsg(KickRemoteMsg msg) {
        //1.如果当前机器不是目标机器，直接忽略
        if (!msg.isTargetPC()) {
            LogUtils.CONN.error("receive kick remote msg, target server error, localIp={}, msg={}", Utils.getLocalIp(), msg);
            return;
        }

        //2.查询本地路由，找到要被踢下线的链接，并删除该本地路由
        long playerId = msg.getPlayerId();
        int clientType = msg.getClientType();
        LocalRouterManager localRouterManager = RouterCenter.I.getLocalRouterManager();
        LocalRouter localRouter = localRouterManager.lookup(playerId, clientType);
        if (localRouter != null) {
            LogUtils.CONN.info("receive kick remote msg, msg={}", msg);
            if (localRouter.getRouteValue().getId().equals(msg.getConnId())) {//二次校验，防止误杀
                //2.1删除本地路由信息
                localRouterManager.unRegister(playerId, clientType);
                //2.2发送踢人消息到客户端
                kickLocal(playerId, localRouter);
            } else {
                LogUtils.CONN.warn("kick router failure target connId not match, localRouter={}, msg={}", localRouter, msg);
            }
        } else {
            LogUtils.CONN.warn("kick router failure can't find local router, msg={}", msg);
        }
    }

    @Override
    public void receive(String topic, Object message) {
        if (getKickChannel().equals(topic)) {
            KickRemoteMsg msg = JsonUtils.fromJson(message.toString(), RedisKickRemoteMessage.class);
            if (msg != null) {
                onReceiveKickRemoteMsg(msg);
            } else {
                LogUtils.CONN.warn("receive an error kick message={}", message);
            }
        } else {
            LogUtils.CONN.warn("receive an error redis channel={}", topic);
        }
    }
}